AWS Lambdas

Descripcion

AWS Lambdas nos permiten ejecutar funciones de manera aislada.

Leer parámetros

Para leer los parámetros que se le pasan a una lambda utilizamos el parámetro event.

Si hemos definido variables en el json de configuración simplemente podemos acceder a ellas asi:

event['nomber_variable']

Si estamos llamando a la Lambda utilizando una URL, podemos hacer con una solicitud post o get y por lo tanto pasarle parámetros en el body del post o query parameters en la URL.

Para obtener el body del post lo hacemos de la siguiente manera:

post_body = json.loads(event['body'])

Y para los query parameters lo hacemos asi:

query_parameters = event["queryStringParameters"]

Este es un ejemplo en el que se obtienen los dos tipos de parámetros:

import json

def lambda_handler(event, context):
    post_body = json.loads(event['body'])
    query_parameters = event["queryStringParameters"]
    
    post_parameter = post_body['id']
    
    my_query_param = query_parameters['name']
    
    return {
        'statusCode': 200,
        'body': json.dumps('id is: ' + post_parameter + ' and name is: ' + my_query_param)
    }
Acceder a DynamoDB, Parameter Store y Http Requets

El siguiente ejemplo muestra como obtener los feeds de la API de Instagram, para ello obtiene el token para hacer la petición desde Parameter Store, hace la petición http con dicho token y despues los resultados los almacena en DynamoDB:

import boto3
import os
import time
from botocore.vendored import requests
    
url_instagram = 'https://graph.instagram.com/me/media'
url_refresh_token='https://graph.instagram.com/refresh_access_token'

access_token = None
ssm = None
table_feed = None

def lambda_handler(event, context):

    #Init Simple Systems Manager for access to Parameter Store
    init_ssm()

    refresh_token()
    
    init_dynamo()

    return get_feeds()
    
def init_ssm():
    parameter_name = '/Backend/SocialNetworks/AccessToken/Instagram'
    global access_token
    global ssm
    
    ssm = boto3.client('ssm')
    parameter = ssm.get_parameter(Name=parameter_name, WithDecryption=True)
    access_token = parameter['Parameter']['Value']
    
def init_dynamo():
    global table_feed
    
    dynamodb = boto3.resource("dynamodb")
    table_feed = dynamodb.Table("feedcontent3")
    
def refresh_token():
    global access_token
    global ssm
    params_token = {
        'grant_type': 'ig_refresh_token',
        'access_token': access_token
    }
    
    response_token = requests.get(url_refresh_token, params=params_token)
    
    if response_token.status_code == 200:
        new_token = response_token.json()
    else:
        raise Exception('Error: cannot refresh token')
    
    ssm.put_parameter(Name='/Backend/SocialNetworks/AccessToken/Instagram', Value=new_token['access_token'], Overwrite=True)
    
    access_token = new_token['access_token']
    
def get_feeds():
    global access_token
    global url_instagram
    global table_feed
    params_feed = {
        'fields': 'id,caption,timestamp,media_url',
        'access_token': access_token
    }
    
    response = requests.get(url_instagram, params=params_feed)
    
    if response.status_code == 200:
        data = response.json()
    else:
        raise Exception('Error: cannot get feeds from Instagram')

    for post in data["data"]:
        caption = ""
        if post.get("caption") != None:
            caption = post["caption"];
        
        try:
            table_feed.put_item(Item={"PostId": post["id"], "caption": caption, "timestamp": post["timestamp"], "mediaurl": post["media_url"], "tiempo": int(time.time())}, ConditionExpression='attribute_not_exists(PostId)')
        except:
            continue
            
    return {"message": "Success getting feeds from Instagram"}

Para poder ejecutar esta lambda tenemos que configurar los siguientes permisos:

Tags

AWS | Lambdas